/**
 * Copyright (c) 2015-2017, Winter Lau (javayou@gmail.com).
 * <p>
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.crazymaker.l2cache.cluster.level2;

import com.crazymaker.l2cache.cluster.level2.canal.CannalBinlogMessageListener;
import com.crazymaker.l2cache.cluster.level2.canal.DataLoader;
import com.crazymaker.l2cache.manager.CacheObject;
import com.crazymaker.l2cache.manager.CacheProviderHolder;
import com.crazymaker.l2cache.manager.Command;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import static com.crazymaker.springcloud.common.util.StringUtils.genRandomSrc;

/**
 * 使用 RocketMQ 实现集群内节点的数据通知（用于对数据一致性要求特别严格的场景）
 *
 * @author Winter Lau (javayou@gmail.com)
 */
public class RocketMQL2Policy implements L2Policy, MessageListenerConcurrently {

    private static final Logger log = LoggerFactory.getLogger(RocketMQL2Policy.class);

    private int LOCAL_COMMAND_ID = genRandomSrc(); //命令源标识，随机生成，每个节点都有唯一标识

    private CacheProviderHolder holder;
    private String hosts;
    private String topicBinlog;
    private String topicCmd;
    private DefaultMQProducer producer;
    private DefaultMQPushConsumer binlogConsumer;
    private CannalBinlogMessageListener binlogMessageListener;

    private DefaultMQPushConsumer cmdConsumer;


    public RocketMQL2Policy(Properties props) {
        this.hosts = props.getProperty("hosts");
        String binLogGroupName = props.getProperty("binLogGroupName", "binLogGroupName");

        this.producer = new DefaultMQProducer(binLogGroupName);
        this.producer.setNamesrvAddr(this.hosts);


        this.topicBinlog = props.getProperty("topicBinlog", "canal_log");

        this.binlogConsumer = new DefaultMQPushConsumer(binLogGroupName);
        this.binlogConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        this.binlogConsumer.setNamesrvAddr(this.hosts);
        this.binlogConsumer.setMessageModel(MessageModel.CLUSTERING);

        binlogMessageListener = new CannalBinlogMessageListener(this);

        this.topicCmd = props.getProperty("topicCmd", "topicCmd");
        String cmdGroupName = props.getProperty("cmdGroupName", "cmdGroupName");
        this.cmdConsumer = new DefaultMQPushConsumer(cmdGroupName);
        this.cmdConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        this.cmdConsumer.setNamesrvAddr(this.hosts);
        this.cmdConsumer.setMessageModel(MessageModel.CLUSTERING);

        binlogMessageListener = new CannalBinlogMessageListener(this);
    }

    @Override
    public boolean isLocalCommand(Command cmd) {
        return cmd.getSrc() == LOCAL_COMMAND_ID;
    }

    /**
     * 删除本地某个缓存条目
     *
     * @param region 区域名称
     * @param keys   缓存键值
     */
    public void evict(String region, String... keys) {
        holder.getLevel1Cache(region).evict(keys);
    }

    /**
     * 清除本地整个缓存区域
     *
     * @param region 区域名称
     */
    public void clear(String region) {
        holder.getLevel1Cache(region).clear();
    }

    @Override
    public void connect(Properties props, CacheProviderHolder holder) {
        this.holder = holder;
        try {
            this.producer.start();

            this.binlogConsumer.subscribe(this.topicBinlog, "*");
//            this.consumer.registerMessageListener(this);
            this.binlogConsumer.registerMessageListener(binlogMessageListener);
            this.binlogConsumer.start();

            this.cmdConsumer.subscribe(this.topicCmd, "*");
            this.cmdConsumer.registerMessageListener(this);
            this.cmdConsumer.start();
        } catch (MQClientException e) {
            log.error("Failed to start producer", e);
        }
    }

    @Override
    public void publish(Command cmd) {
        cmd.setSrc(LOCAL_COMMAND_ID);
        Message msg = new Message(topicCmd, "", "", cmd.json().getBytes());
        try {
            this.producer.send(msg, 100000);
        } catch (Exception e) {
            log.error("Failed to publish {} to RocketMQ", cmd.json(), e);
        }
    }

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
        try {
            for (MessageExt msg : list) {
                handleCommand(Command.parse(new String(msg.getBody())));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

        } catch (Exception e) {
            e.printStackTrace();
        }
        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }

    @Override
    public void disconnect() {
        try {
            publish(Command.quit());
        } finally {
            this.producer.shutdown();
            this.binlogConsumer.shutdown();
        }
    }

    public void processBinLogInsert(String database, String table, List<Map<String, String>> dataList) {
        log.debug(dataList.toString());

    }

    public void processBinLogUPDATE(String database, String table, List<Map<String, String>> dataList) {
        log.debug(dataList.toString());


        DataLoader loader = holder.getDataLoader();

/**
 * 场景1 ： common key 删除 二级缓存，后再删除 一级缓存 ，三级缓存
 *
 *  region， key
 *
 * 场景2 ：hotkey 更新二级缓存，后再更新 一级缓存 ，更新 三级 缓存
 */


        String region = loader.getRegion(database, table);
//        String[] noHotKeys = loader.getKeys(database, table, dataList);
        String[] noHotKeys = loader.getNoHotKeys(database, table, dataList);

        if (null == region) {
            return;
        }


        if (null != noHotKeys && noHotKeys.length > 0) {
            //场景1 : common key
            try {

                holder.getLevel2Cache(region).evict(noHotKeys);
                holder.getLevel1Cache(region).evict(noHotKeys);
            } finally {
                holder.getL1ClusterPolicy().sendEvictCmd(region, noHotKeys); //发送广播
            }
        }
        //场景2 : hotey

        // 问题1： 如何判定 hot key    问题2： 拿到缓存数据   问题3： 一级缓存的分布式写入
        //场景2 : hotey

        // 问题1： 如何判定 hot key    问题2： 拿到缓存数据   问题3： 一级缓存的分布式写入

        Iterator<Map<String, String>> it = dataList.iterator();
        while (it.hasNext()) {
            Map<String, String> e = it.next();
            if (null == e.get("hotted")) continue;

            String json = loader.getCacheData(database, table, e);
            String key = loader.getKey(database, table, e);
            holder.getCacheChannel().setHotData(region, key, json);

//            if (null != noHotKeys && noHotKeys.length > 0) {
//                //场景1 : common key
//                try {
//
//                    holder.getLevel2Cache(region).evict(noHotKeys);
//                    holder.getLevel1Cache(region).evict(noHotKeys);
//                } finally {
//                    holder.getL1ClusterPolicy().sendEvictCmd(region, noHotKeys); //发送广播
//                }
//            }
        }
    }

    public void processBinLogDELETE(String database, String table, List<Map<String, String>> dataList) {
        log.debug(dataList.toString());
    }

    @Override
    public void hotCache(Command cmd) {
        DataLoader loader = holder.getDataLoader();
        String[] cmdKeys = cmd.getKeys();
        for (String cmdKey : cmdKeys) {

            String[] split = cmdKey.split(":", 2);
            String region = split[0];
            String id = split[1];
            String json = null;
            CacheObject cache = holder.getCacheChannel().getInner(region, cmdKey, false);

            if (cache.rawValue() != null) {
                json = (String) cache.rawValue();
            } else {
                json = loader.getCacheData(region, id);
            }

            holder.getCacheChannel().setHotData(region, id, json);
        }
        holder.getL3Policy().hotCache(cmd);
    }
}
